"""Authentication for HTTP component."""
import base64
import logging

from aiohttp import hdrs
from aiohttp.web import middleware
import jwt

from homeassistant.auth.providers import legacy_api_password
from homeassistant.auth.util import generate_secret
from homeassistant.const import HTTP_HEADER_HA_AUTH
from homeassistant.core import callback
from homeassistant.util import dt as dt_util

from .const import KEY_AUTHENTICATED, KEY_HASS_USER, KEY_REAL_IP


# mypy: allow-untyped-defs, no-check-untyped-defs

_LOGGER = logging.getLogger(__name__)

DATA_API_PASSWORD = "api_password"
DATA_SIGN_SECRET = "http.auth.sign_secret"
SIGN_QUERY_PARAM = "authSig"


@callback
def async_sign_path(hass, refresh_token_id, path, expiration):
    """Sign a path for temporary access without auth header."""
    secret = hass.data.get(DATA_SIGN_SECRET)

    if secret is None:
        secret = hass.data[DATA_SIGN_SECRET] = generate_secret()

    now = dt_util.utcnow()
    return "{}?{}={}".format(
        path,
        SIGN_QUERY_PARAM,
        jwt.encode(
            {
                "iss": refresh_token_id,
                "path": path,
                "iat": now,
                "exp": now + expiration,
            },
            secret,
            algorithm="HS256",
        ).decode(),
    )


@callback
def setup_auth(hass, app):
    """Create auth middleware for the app."""
    old_auth_warning = set()

    support_legacy = hass.auth.support_legacy
    if support_legacy:
        _LOGGER.warning("legacy_api_password support has been enabled.")

    trusted_networks = []
    for prv in hass.auth.auth_providers:
        if prv.type == "trusted_networks":
            trusted_networks += prv.trusted_networks

    async def async_validate_auth_header(request):
        """
        Test authorization header against access token.

        Basic auth_type is legacy code, should be removed with api_password.
        """
        try:
            auth_type, auth_val = request.headers.get(hdrs.AUTHORIZATION).split(" ", 1)
        except ValueError:
            # If no space in authorization header
            return False

        if auth_type == "Bearer":
            refresh_token = await hass.auth.async_validate_access_token(auth_val)
            if refresh_token is None:
                return False

            request[KEY_HASS_USER] = refresh_token.user
            return True

        if auth_type == "Basic" and support_legacy:
            decoded = base64.b64decode(auth_val).decode("utf-8")
            try:
                username, password = decoded.split(":", 1)
            except ValueError:
                # If no ':' in decoded
                return False

            if username != "homeassistant":
                return False

            user = await legacy_api_password.async_validate_password(hass, password)
            if user is None:
                return False

            request[KEY_HASS_USER] = user
            _LOGGER.info(
                "Basic auth with api_password is going to deprecate,"
                " please use a bearer token to access %s from %s",
                request.path,
                request[KEY_REAL_IP],
            )
            old_auth_warning.add(request.path)
            return True

        return False

    async def async_validate_signed_request(request):
        """Validate a signed request."""
        secret = hass.data.get(DATA_SIGN_SECRET)

        if secret is None:
            return False

        signature = request.query.get(SIGN_QUERY_PARAM)

        if signature is None:
            return False

        try:
            claims = jwt.decode(
                signature, secret, algorithms=["HS256"], options={"verify_iss": False}
            )
        except jwt.InvalidTokenError:
            return False

        if claims["path"] != request.path:
            return False

        refresh_token = await hass.auth.async_get_refresh_token(claims["iss"])

        if refresh_token is None:
            return False

        request[KEY_HASS_USER] = refresh_token.user
        return True

    async def async_validate_trusted_networks(request):
        """Test if request is from a trusted ip."""
        ip_addr = request[KEY_REAL_IP]

        if not any(ip_addr in trusted_network for trusted_network in trusted_networks):
            return False

        user = await hass.auth.async_get_owner()
        if user is None:
            return False

        request[KEY_HASS_USER] = user
        return True

    async def async_validate_legacy_api_password(request, password):
        """Validate api_password."""
        user = await legacy_api_password.async_validate_password(hass, password)
        if user is None:
            return False

        request[KEY_HASS_USER] = user
        return True

    @middleware
    async def auth_middleware(request, handler):
        """Authenticate as middleware."""
        authenticated = False

        if HTTP_HEADER_HA_AUTH in request.headers or DATA_API_PASSWORD in request.query:
            if request.path not in old_auth_warning:
                _LOGGER.log(
                    logging.INFO if support_legacy else logging.WARNING,
                    "api_password is going to deprecate. You need to use a"
                    " bearer token to access %s from %s",
                    request.path,
                    request[KEY_REAL_IP],
                )
                old_auth_warning.add(request.path)

        if hdrs.AUTHORIZATION in request.headers and await async_validate_auth_header(
            request
        ):
            # it included both use_auth and api_password Basic auth
            authenticated = True

        # We first start with a string check to avoid parsing query params
        # for every request.
        elif (
            request.method == "GET"
            and SIGN_QUERY_PARAM in request.query
            and await async_validate_signed_request(request)
        ):
            authenticated = True

        elif trusted_networks and await async_validate_trusted_networks(request):
            if request.path not in old_auth_warning:
                # When removing this, don't forget to remove the print logic
                # in http/view.py
                request["deprecate_warning_message"] = (
                    "Access from trusted networks without auth token is "
                    "going to be removed in Home Assistant 0.96. Configure "
                    "the trusted networks auth provider or use long-lived "
                    "access tokens to access {} from {}".format(
                        request.path, request[KEY_REAL_IP]
                    )
                )
                old_auth_warning.add(request.path)
            authenticated = True

        elif (
            support_legacy
            and HTTP_HEADER_HA_AUTH in request.headers
            and await async_validate_legacy_api_password(
                request, request.headers[HTTP_HEADER_HA_AUTH]
            )
        ):
            authenticated = True

        elif (
            support_legacy
            and DATA_API_PASSWORD in request.query
            and await async_validate_legacy_api_password(
                request, request.query[DATA_API_PASSWORD]
            )
        ):
            authenticated = True

        request[KEY_AUTHENTICATED] = authenticated
        return await handler(request)

    app.middlewares.append(auth_middleware)
